package com.intct.func;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

/**
 * @author gufg
 * @since 2025-10-25 10:11
 */
public class SqlAndApiConvert {
    public static DataStream<Row> sqlConvertApi(DataStream<Row> rowDataStream) {
        // 将撤回流中的-U -D 过滤掉, +U转为+I
        SingleOutputStreamOperator<Row> filterDS = rowDataStream.filter(row -> {
            RowKind rowKind = row.getKind();
            String rowkKindStr = rowKind.shortString();
            boolean isType = "+I".equals(rowkKindStr) || "+U".equals(rowkKindStr);

            if ("-D".equals(rowkKindStr)) {
                row.setField("op", "d");
            } else if ("-U".equals(rowkKindStr)) {
                row.setField("op", "d");
            } else if ("+U".equals(rowkKindStr)) {
                row.setField("op",  "c");
            }

            // 将数据中的数据变更改为增加
            row.setKind(RowKind.INSERT);
            // 过滤掉-D -U
            return isType;
        });

        return null;
    }

    public static Table apiConvertSql() {
        return null;
    }
}
